package com.flink.examples.window;

import com.flink.examples.DataSource;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.util.List;

/**
 * @Description 不分区时间滚动窗口
 * @Author JL
 * @Date 2020/09/15
 * @Version V1.0
 */
public class TimeWindowAll {

    /*
    窗口在处理流数据时，通常会对流进行分区；
    数据流划分为：
    keyed（根据key划分不同数据流区）
    non-keyed(指没有按key划分的数据流区，指所有原始数据流)
    */

    /**
     * 遍历集合，返回指定时间滚动窗口下最大年龄数据记录
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //env.setParallelism(1);
        DataStream<Tuple3<String, String, Integer>> inStream = env.addSource(new MyRichSourceFunction());
        DataStream<Tuple3<String, String, Integer>> dataStream = inStream
                //按时间窗口滑动，对前6秒内的输入数据流，计算一次
                .timeWindowAll(Time.seconds(6))
                //注意：计算变量为f2
                .maxBy(2);
        dataStream.print();
        env.execute("flink TimeWindow job");
    }

    /**
     * 模拟数据持续输出
     */
    public static class MyRichSourceFunction extends RichSourceFunction<Tuple3<String, String, Integer>> {
        @Override
        public void run(SourceContext<Tuple3<String, String, Integer>> ctx) throws Exception {
            List<Tuple3<String, String, Integer>> tuple3List = DataSource.getTuple3ToList();
            for (Tuple3 tuple3 : tuple3List){
                ctx.collect(tuple3);
                //1秒钟输出一个
                Thread.sleep(1 * 1000);
            }
        }
        @Override
        public void cancel() {
            try{
                super.close();
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }
}
/*
2> (王五,man,29)
 */